GCS にファイルが配置されたらイベント駆動で BigQuery にデータロードするサーバレスなジョブをつくってみた
こんにちは、みかみです。
やりたいこと
- GCS のファイルを BigQuery にロードするジョブを手軽に(サーバレスで)実装したい
- GCS にファイルが配置されたタイミングでイベント駆動でデータロードジョブを実行したい
GCS バケットを作成
以下のチュートリアルを参考に、gcloud コマンドで GCS バケットと Cloud Functions 関数をデプロイします。
Cloud Shell を使えば、ブラウザ上でコマンドラインが実行でき、gcloud ツールもプリインストールされているので便利です。
まずはデータ連携用の GCS バケットを作成します。
gcp_da_user@cloudshell:~/load (cm-da-mikami-yuki-258308)$ gsutil mb gs://job-mikami-data-load Creating gs://job-mikami-data-load/...
GCP 管理画面からも、バケットが作成されたことが確認できました。
なお、GCS から BigQuery へデータロードする場合、GCS バケットと BigQuery のデータセットは同じロケーションに配置する必要があります。
BigQuery にデータロードする関数を Cloud Functions にデプロイ
引き続き Cloud Shell を使って、GCS のファイルデータを BigQuery にロードする関数を Cloud Functions にデプロイします。
今回は Python でコーディングしましたが、Cloud Functions の関数は、他に Node.js や Go、Java で実装することも可能です。
以下の Python コードを main.py
というファイル名で保存しました。
from google.cloud import bigquery def load_data(data, context): # check content-type if data['contentType'] != 'text/csv': print('Not supported file type: {}'.format(data['contentType'])) return # get file info bucket_name = data['bucket'] file_name = data['name'] uri = 'gs://{}/{}'.format(bucket_name, file_name) dataset_id = 'load_from_gcs' table_id = 'table_sample' bq = bigquery.Client() dataset_ref = bq.dataset(dataset_id) # Set Load Config job_config = bigquery.LoadJobConfig() job_config.autodetect = True job_config.source_format = bigquery.SourceFormat.CSV job_config.write_disposition = 'WRITE_APPEND' # Load data load_job = client.load_table_from_uri( uri, dataset_ref.table(table_id), job_config=job_config ) print("Starting job {}".format(load_job.job_id)) load_job.result() print("Job finished.")
import
が必要な外部ライブラリは requirements.txt
ファイルで保存します。
google-cloud-bigquery==1.24.0
Cloud Storage トリガーを指定した場合、Cloud Functions 関数の data
パラメータとして Cloud Storage object が渡されます。
Cloud Functions の Cloud Storage トリガーには、以下の4種類のイベントがあります。
- ファイナライズ / 作成:オブジェクトの作成または上書き
- 削除:オブジェクトの削除
- アーカイブ:バージョニング対応バケットの場合に、オブジェクトのアーカイブまたは削除
- メタデータを更新:オブジェクトのメタデータ変更
今回は連携ファイルが GCS に配置されたことをトリガーに BigQuery へのロード処理を実行したいので、トリガーには google.storage.object.finalize
を指定します。
main.py
と requirements.txt
ファイルを配置してあるディレクトリで、以下のコマンド実行します。
※deploy
する関数名と --trigger-resource
で指定する GCS バケット名は環境に合わせて変更する必要があります。
gcloud functions deploy load_data --runtime python37 --trigger-resource job-mikami-data-load --trigger-event google.storage.object.finalize
gcp_da_user@cloudshell:~/load (cm-da-mikami-yuki-258308)$ gcloud functions deploy load_data --runtime python37 --trigger-resource job-mikami-data-load --trigger-event google.storage.object.finalize Allow unauthenticated invocations of new function [load_data]? (y/N)? y Deploying function (may take a while - up to 2 minutes)...done. availableMemoryMb: 256 entryPoint: load_data eventTrigger: eventType: google.storage.object.finalize failurePolicy: {} resource: projects/_/buckets/job-mikami-data-load service: storage.googleapis.com ingressSettings: ALLOW_ALL labels: deployment-tool: cli-gcloud name: projects/cm-da-mikami-yuki-258308/locations/us-central1/functions/load_data runtime: python37 serviceAccountEmail: cm-da-mikami-yuki-258308@appspot.gserviceaccount.com sourceUploadUrl: https://storage.googleapis.com/gcf-upload-us-central1-2365985c-f7e0-4882-9f01-971d82702e1f/be62c39f-43fa-4d2d-8bed-4011eb493a96.zip?GoogleAccessId=service-797147019523@gcf-admin-robot.ia m.gserviceaccount.com&Expires=1594008965&Signature=wVQjoPo7nrVwNAZIITMeZKcm%2Fu1hovjSlhX1gcDJ5JhACFHh6gx66H%2BOzVi%2BcDkMA%2B6G2pUSYT8wnm9PdRx6lmTJ4m27kpRtbPRBMfjM5oEALR1i0WpNxSV362yGjFLHgwddzaqfKgGXSUpU MykkE9ZwpzFpHqT%2FvPPRNytUez6bDPrE%2Fkbmfi1B%2FK9AZQBRq5ZftakR1%2FqJjIOU5iZyfSSS%2FNegzNldff2uWwRGQknET2QBLEq6P0%2BOzSlEayS23krsrKSoNiLG3XUri8gbpNdaBg7iZ7exXaK4lUOz0li2%2FL14YSPYOuc10vD21knLU1hZwnjhAxiLD 6A%2FYLKj1Q%3D%3D status: ACTIVE timeout: 60s updateTime: '2020-07-06T03:47:29.542Z' versionId: '1'
Cloud Functions 関数が正常にデプロイされました。
サンプルデータ作成用の Cloud Functions 関数を準備
動作確認のため、サンプルデータを作成して GCS に CSV ファイルとして Put する関数を Cloud Functions にデプロイし、Cloud Scheduler でスケジュール実行してみます。
- Google Cloud Pub/Sub トリガー | Cloud Functions ドキュメント
- Cloud Pub/Sub のチュートリアル | Cloud Functions ドキュメント
- Cloud Scheduler クイックスタート | Cloud Scheduler ドキュメント
以下の Python コードと requirements.txt
を準備しました。
import pandas as pd from random import random from datetime import datetime from google.cloud import storage def make_sample_data(data, context): now = datetime.now() df=pd.DataFrame({'value':[random() for _ in range(10)], 'create_time':[datetime.now() for _ in range(10)]}) df = df[['value', "create_time"]] client = storage.Client() bucket_name = 'job-mikami-data-load' blob_name = 'input/sample_{}.csv'.format(datetime.now().strftime('%Y%m%d%H%M%S')) bucket = client.get_bucket(bucket_name) blob = bucket.blob(blob_name) blob.upload_from_string(data=df.to_csv(index=False), content_type='text/csv') print("Blob {} created.".format(blob_name))
google-cloud-storage==1.28.1 pandas==1.0.0
トリガーに Pub /Sub トピックを指定して Cloud Functions に関数をデプロイします。
gcp_da_user@cloudshell:~/sample (cm-da-mikami-yuki-258308)$ gcloud functions deploy make_sample_data --runtime python37 --trigger-topic make_sample_data Allow unauthenticated invocations of new function [make_sample_data]? (y/N)? y Deploying function (may take a while - up to 2 minutes)...done. availableMemoryMb: 256 entryPoint: make_sample_data eventTrigger: eventType: google.pubsub.topic.publish failurePolicy: {} resource: projects/cm-da-mikami-yuki-258308/topics/make_sample_data service: pubsub.googleapis.com ingressSettings: ALLOW_ALL labels: deployment-tool: cli-gcloud name: projects/cm-da-mikami-yuki-258308/locations/us-central1/functions/make_sample_data runtime: python37 serviceAccountEmail: cm-da-mikami-yuki-258308@appspot.gserviceaccount.com sourceUploadUrl: https://storage.googleapis.com/gcf-upload-us-central1-2365985c-f7e0-4882-9f01-971d82702e1f/bfbca735-4c04-40ff-8569-d65d7533703c.zip?GoogleAccessId=service-797147019523@gcf-admin-robot.ia m.gserviceaccount.com&Expires=1594012319&Signature=UE5utvgBdIyRh88Xcn3zekP5Tpk%2B1jVdgLa%2F5aLh3mBTBCWrqhqKWJpLXW6khvtwnbhw3qzMMb2TqMNQZP1FOU7IP8XV6%2B368rj7f5tm7tF9GG9z4opC8gFVqNb%2BSvtk%2FDExG9yxpVjc%2 FXDUEJ8yty0FOmWTXmmLMeLZFv9Ha38xp%2FzL%2BLkRW6tLKxwcnAfRpOyhjytf0XaNWT9v15sLdbKflCJfQT5gxztu1Rx17%2Frm6E1EnoefEIQEAUitWL3Qz1VwzYZiM1uA7wThoQRKMdiBninGO4Dlc9U%2FNsKE5P57d8bspSmB6D1bz9gmnZhYcKvV5vUvlL0VQnA Yhf1rAg%3D%3D status: ACTIVE timeout: 60s updateTime: '2020-07-06T04:43:49.075Z' versionId: '1'
正常にデプロイできました。
続いて Pub/Sub のサブスクリプションを作成し、Cloud Scheduler のジョブを作成します。
gcp_da_user@cloudshell:~/sample (cm-da-mikami-yuki-258308)$ gcloud pubsub subscriptions create gcf-make-sample-data --topic make_sample_data Created subscription [projects/cm-da-mikami-yuki-258308/subscriptions/gcf-make-sample-data]. gcp_da_user@cloudshell:~/sample (cm-da-mikami-yuki-258308)$ gcloud scheduler jobs create pubsub make_sample_data --schedule "*/5 * * * *" --topic make_sample_data --message-body "sample" name: projects/cm-da-mikami-yuki-258308/locations/asia-northeast1/jobs/make_sample_data pubsubTarget: data: c2FtcGxl topicName: projects/cm-da-mikami-yuki-258308/topics/make_sample_data retryConfig: maxBackoffDuration: 3600s maxDoublings: 16 maxRetryDuration: 0s minBackoffDuration: 5s schedule: '*/5 * * * *' state: ENABLED timeZone: Etc/UTC userUpdateTime: '2020-07-06T04:47:40Z'
これで、5分ごとにサンプルデータを連携バケットに Put する準備ができました。
実行結果を確認
5 分ほど待ってから GCS バケットを確認してみると、Cloud Scheduler のジョブが実行され、GCS にサンプルデータが出力されていることが確認できました。
BigQuery にも GCS のファイルデータが正常にロードされました。
約 1 時間後、期待通り 5 分ごとに複数のサンプルデータファイルが出力されていることを確認し、
ファイル作成のたびにちゃんとデータが BigQuery にロードされているかどうか確認してみます。
gcp_da_user@cloudshell:~ (cm-da-mikami-yuki-258308)$ bq query --nouse_legacy_sql \ > 'SELECT > create_time, > count > FROM ( > SELECT > FORMAT_TIME("%R", CAST(create_time as TIME)) as create_time, > count(*) as count > FROM > `cm-da-mikami-yuki-258308.load_from_gcs.table_load` > GROUP BY > FORMAT_TIME("%R", CAST(create_time as TIME)) > ) > ORDER BY > create_time' Waiting on bqjob_r6c8c13626ba9f6c1_0000017322bb387c_1 ... (0s) Current status: DONE +-------------+-------+ | create_time | count | +-------------+-------+ | 04:50 | 10 | | 04:55 | 10 | | 05:00 | 10 | | 05:05 | 10 | | 05:10 | 10 | | 05:15 | 10 | | 05:20 | 10 | | 05:25 | 10 | | 05:30 | 10 | | 05:35 | 10 | | 05:40 | 10 | | 05:45 | 10 | | 05:50 | 10 | | 05:55 | 10 | | 06:00 | 10 | | 06:05 | 10 | +-------------+-------+
期待通り、GCS にファイルオブジェクトが作成タイミングでイベント駆動で Cloud Functions 関数が実行され、全てのサンプルデータが BigQuery にロードされたことが確認できました。
まとめ(所感)
GCP の Cloud Functions を使えば、AWS の Lambda のように、ファイルストレージにオブジェクトが作成されたタイミングでイベント駆動で必要な処理を実行することができます。
GCS から BigQuery にデータロードする関数を Cloud Functions にデプロイすることで、サーバー環境やツールなどの大掛かりな準備なしで、データファイルが GCS に配置されたタイミングで即座に連携データを BigQuery にロードするジョブを手軽に実装することができました。
動作確認やデプロイなどに Google Cloud SDK や クライアントライブラリを使いたい場合、Cloud Shell を使えば実行環境の準備をする必要もないので、ジョブ実装のハードルはさらに下がります。
Cloud Functions の関数には、最大 9 分のタイムアウト制限や、最大 2,048 MBのメモリ制限があるので、連携データが大量な場合などには考慮が必要かと思いますが、環境構築などの手間なく低コストで使用できる Cloud Functions は非常に使いやすいと思いました。
参考
- Cloud Functions の概要 | Cloud Functions ドキュメント
- イベントとトリガー | Cloud Functions ドキュメント
- Google Cloud Storage トリガー | Cloud Functions ドキュメント
- Google Cloud Pub/Sub トリガー | Cloud Functions ドキュメント
- Cloud Functions 実行環境 | Cloud Functions ドキュメント
- 割り当て | Cloud Functions ドキュメント
- タイムアウト | Cloud Functions ドキュメント
- リソースに関する上限 | Cloud Functions ドキュメント
- 料金 | Cloud Functions ドキュメント
- Cloud Storage の Pub/Sub 通知 | Cloud Storage ドキュメント
- Pub/Sub Notifications for Cloud Storage の使用法 | Cloud Storage ドキュメント
- Cloud Scheduler クイックスタート | Cloud Scheduler ドキュメント
- Cloud Shell の起動 | Cloud Shell ドキュメント
- Cloud Shell での gcloud コマンドの実行 | Cloud Shell ドキュメント